import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';

class PublishSubjectTestPage extends TestPage {
  PublishSubjectTestPage(super.title) {
    group('PublishSubject', () {
      test('emits items to every subscriber', () async {
        // ignore: close_sinks
        final subject = PublishSubject<int>();

        scheduleMicrotask(() {
          subject.add(1);
          subject.add(2);
          subject.add(3);
          subject.close();
        });

        expect(subject.stream);
      });

      test(
          'emits items to every subscriber that subscribe directly to the Subject',
              () async {
            // ignore: close_sinks
            final subject = PublishSubject<int>();

            scheduleMicrotask(() {
              subject.add(1);
              subject.add(2);
              subject.add(3);
              subject.close();
            });

            expect(subject);
          });

      test('emits done event to listeners when the subject is closed', () async {
        final subject = PublishSubject<int>();

        expect(subject.isClosed);

        scheduleMicrotask(() => subject.add(1));
        scheduleMicrotask(() => subject.close());

        expect(subject.stream);
        expect(subject.isClosed);
      });

      test(
          'emits done event to listeners when the subject is closed (listen directly on Subject)',
              () async {
            final subject = PublishSubject<int>();

            expect(subject.isClosed);

            scheduleMicrotask(() => subject.add(1));
            scheduleMicrotask(() => subject.close());

            expect(subject);
            expect(subject.isClosed);
          });

      test('emits error events to subscribers', () async {
        // ignore: close_sinks
        final subject = PublishSubject<int>();

        scheduleMicrotask(() => subject.addError(Exception()));

        expect(subject.stream);
      });

      test('emits error events to subscribers (listen directly on Subject)',
              () async {
            // ignore: close_sinks
            final subject = PublishSubject<int>();

            scheduleMicrotask(() => subject.addError(Exception()));

            expect(subject);
          });

      test('emits the items from addStream', () async {
        // ignore: close_sinks
        final subject = PublishSubject<int>();

        scheduleMicrotask(
                () => subject.addStream(Stream.fromIterable(const [1, 2, 3])));

        expect(subject.stream);
      });

      test('allows items to be added once addStream is complete', () async {
        // ignore: close_sinks
        final subject = PublishSubject<int>();

        await subject.addStream(Stream.fromIterable(const [1, 2]));
        scheduleMicrotask(() => subject.add(3));

        expect(subject.stream);
      });

      test('allows items to be added once addStream completes with an error',
              () async {
            // ignore: close_sinks
            final subject = PublishSubject<int>();

            unawaited(subject
                .addStream(Stream<int>.error(Exception()), cancelOnError: true)
                .whenComplete(() => subject.add(1)));

            expect(subject.stream);
          });

      test('does not allow events to be added when addStream is active',
              () async {
            // ignore: close_sinks
            final subject = PublishSubject<int>();

            // Purposely don't wait for the future to complete, then try to add items
            // ignore: unawaited_futures
            subject.addStream(Stream.fromIterable(const [1, 2, 3]));

            expect(() => subject.add(1));
          });

      test('does not allow errors to be added when addStream is active',
              () async {
            // ignore: close_sinks
            final subject = PublishSubject<int>();

            // Purposely don't wait for the future to complete, then try to add items
            // ignore: unawaited_futures
            subject.addStream(Stream.fromIterable(const [1, 2, 3]));

            expect(() => subject.addError(Error()));
          });

      test('does not allow subject to be closed when addStream is active',
              () async {
            // ignore: close_sinks
            final subject = PublishSubject<int>();

            // Purposely don't wait for the future to complete, then try to add items
            // ignore: unawaited_futures
            subject.addStream(Stream.fromIterable(const [1, 2, 3]));

            expect(() => subject.close());
          });

      test(
          'does not allow addStream to add items when previous addStream is active',
              () async {
            // ignore: close_sinks
            final subject = PublishSubject<int>();

            // Purposely don't wait for the future to complete, then try to add items
            // ignore: unawaited_futures
            subject.addStream(Stream.fromIterable(const [1, 2, 3]));

            expect(() => subject.addStream(Stream.fromIterable(const [1])));
          });

      test('returns onListen callback set in constructor', () async {
        void testOnListen() {}
        // ignore: close_sinks
        final subject = PublishSubject<int>(onListen: testOnListen);

        expect('${subject.onListen}, $testOnListen');
      });

      test('sets onListen callback', () async {
        void testOnListen() {}
        // ignore: close_sinks
        final subject = PublishSubject<int>();

        expect(subject.onListen);

        subject.onListen = testOnListen;

        expect('${subject.onListen}, $testOnListen');
      });

      test('returns onCancel callback set in constructor', () async {
        Future<void> onCancel() => Future<void>.value(null);
        // ignore: close_sinks
        final subject = PublishSubject<int>(onCancel: onCancel);

        expect('${subject.onCancel}, $onCancel');
      });

      test('sets onCancel callback', () async {
        void testOnCancel() {}
        // ignore: close_sinks
        final subject = PublishSubject<int>();

        expect(subject.onCancel);

        subject.onCancel = testOnCancel;

        expect('${subject.onCancel}, $testOnCancel');
      });

      test('reports if a listener is present', () async {
        // ignore: close_sinks
        final subject = PublishSubject<int>();

        expect(subject.hasListener);

        subject.stream.listen(null);

        expect(subject.hasListener);
      });

      test('onPause unsupported', () {
        // ignore: close_sinks
        final subject = PublishSubject<int>();

        expect(subject.isPaused);
        expect(() => subject.onPause);
        expect(() => subject.onPause = () {});
      });

      test('onResume unsupported', () {
        // ignore: close_sinks
        final subject = PublishSubject<int>();

        expect(() => subject.onResume);
        expect(() => subject.onResume = () {});
      });

      test('returns controller sink', () async {
        // ignore: close_sinks
        final subject = PublishSubject<int>();

        expect(subject.sink);
      });

      test('correctly closes done Future', () async {
        final subject = PublishSubject<int>();

        scheduleMicrotask(() => subject.close());

        expect(subject.done);
      });

      test('can be listened to multiple times', () async {
        // ignore: close_sinks
        final subject = PublishSubject<int>();
        final stream = subject.stream;

        scheduleMicrotask(() => subject.add(1));
        expect(stream);

        scheduleMicrotask(() => subject.add(2));
        expect(stream);
      });

      test('always returns the same stream', () async {
        // ignore: close_sinks
        final subject = PublishSubject<int>();

        expect(subject.stream);
      });

      test('adding to sink has same behavior as adding to Subject itself',
              () async {
            // ignore: close_sinks
            final subject = PublishSubject<int>();

            scheduleMicrotask(() {
              subject.sink.add(1);
              subject.sink.add(2);
              subject.sink.add(3);
              subject.sink.close();
            });

            expect(subject.stream);
          });

      test('is always treated as a broadcast Stream', () async {
        // ignore: close_sinks
        final subject = PublishSubject<int>();
        final stream = subject.asyncMap((event) => Future.value(event));

        expect(subject.isBroadcast);
        expect(stream.isBroadcast);
      });

      test('stream returns a read-only stream', () async {
        final subject = PublishSubject<int>();

        // streams returned by PublishSubject are read-only stream,
        // ie. they don't support adding events.
        expect(subject.stream);
        expect(subject.stream);

        // PublishSubject.stream is a broadcast stream
            {
          final stream = subject.stream;
          expect(stream.isBroadcast);

          scheduleMicrotask(() => subject.add(1));
          expect(stream);

          scheduleMicrotask(() => subject.add(1));
          expect(stream);
        }

        // streams returned by the same subject are considered equal,
        // but not identical
        expect(identical(subject.stream, subject.stream));
        expect(subject.stream == subject.stream);
      });
    });
  }

}